Source code for hysop.backend.host.host_operator

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Abstract class providing a common interface to all
discrete operators working on the Host backend.

* :class:`~hysop.backend.host.host_operator.HostOperator` is an abstract class
    used to provide a common interface to all discrete operators working with the
    opencl backend.
"""
from abc import ABCMeta
from contextlib import contextmanager
from hysop.tools.decorators import debug
from hysop.tools.htypes import check_instance, first_not_None
from hysop.constants import ComputeGranularity, Backend
from hysop.core.graph.computational_operator import ComputationalGraphOperator
from hysop.topology.topology_descriptor import TopologyDescriptor


[docs] class HostOperatorBase(ComputationalGraphOperator, metaclass=ABCMeta): """ Abstract class for discrete operators working on cpu. HostOperatorBase ignore the extra cl_env keyword parameter. """ @debug def __new__(cls, cl_env=None, **kwds): return super().__new__(cls, **kwds) @debug def __init__(self, cl_env=None, **kwds): """ Create the common attributes of all host operators. All input and output variable topologies should be of kind Backend.HOST and share the same HostEnvironment. """ super().__init__(**kwds)
[docs] class HostOperator(HostOperatorBase, metaclass=ABCMeta): """ Abstract class for discrete operators working on cpu. HostOperator extra cl_env keyword parameter and enforces HOST backend. """
[docs] @classmethod def supported_backends(cls): """ Return the backends that this operator's topologies can support. """ return {Backend.HOST}
[docs] class OpenClMappedMemoryObjectGetter: def __new__(cls, obj, evt, **kwds): return super().__new__(cls, **kwds) def __init__(self, obj, evt, **kwds): super().__init__(**kwds) check_instance(obj, OpenClMappable) self.__obj = obj self.__evt = evt def __getitem__(self, key): return self.__obj.get_mapped_object(key=key) @property def evt(self): return self.__evt
[docs] class OpenClMappable: """ Extend host operator capabilities to work on mapped opencl buffers """
[docs] @classmethod def supported_backends(cls): sb = super().supported_backends() sb.add(Backend.OPENCL) return sb
[docs] @debug def create_topology_descriptors(self): if self.enable_opencl_host_buffer_mapping: # enforce opencl topology on host operator for field, topo_descriptor in self.input_fields.items(): topo_descriptor = TopologyDescriptor.build_descriptor( backend=Backend.OPENCL, operator=self, field=field, handle=topo_descriptor, cl_env=self.cl_env, ) self.input_fields[field] = topo_descriptor for field, topo_descriptor in self.output_fields.items(): topo_descriptor = TopologyDescriptor.build_descriptor( backend=Backend.OPENCL, operator=self, field=field, handle=topo_descriptor, cl_env=self.cl_env, ) self.output_fields[field] = topo_descriptor else: super().create_topology_descriptors()
def __new__( cls, cl_env=None, mpi_params=None, enable_opencl_host_buffer_mapping=False, **kwds, ): return super().__new__(cls, mpi_params=mpi_params, **kwds) def __init__( self, cl_env=None, mpi_params=None, enable_opencl_host_buffer_mapping=False, **kwds, ): if enable_opencl_host_buffer_mapping: msg = "OpenClMappable is an interface dedicated to extend HostOperator." assert isinstance(self, HostOperator), msg if cl_env is not None: if mpi_params is None: mpi_params = cl_env.mpi_params else: assert mpi_params == cl_env.mpi_params super().__init__(mpi_params=mpi_params, **kwds) self.__cl_env = cl_env self.__enable_opencl_host_buffer_mapping = enable_opencl_host_buffer_mapping self.__mapped = False self.__registered_objects = {} self.__registered_getters = {} self.__mapped_objects = {} def __del__(self): self.unmap_objects(force=True) @property def cl_env(self): return self.__cl_env @property def enable_opencl_host_buffer_mapping(self): return self.__enable_opencl_host_buffer_mapping
[docs] def setup(self, **kwds): super().setup(**kwds) self._register_fields()
def _register_fields(self): from hysop.fields.discrete_field import ( DiscreteScalarField, DiscreteScalarFieldView, ) ivfields = set( filter( lambda f: f.backend.kind == Backend.OPENCL, self.input_discrete_fields.values(), ) ) ovfields = set( filter( lambda f: f.backend.kind == Backend.OPENCL, self.output_discrete_fields.values(), ) ) check_instance(ivfields, set, values=DiscreteScalarFieldView) check_instance(ovfields, set, values=DiscreteScalarFieldView) vfields = ivfields.union(ovfields) if vfields: assert self.cl_env is not None, "No opencl environment has been given." from hysop.backend.device.opencl.opencl_env import OpenClEnvironment check_instance(self.cl_env, OpenClEnvironment) from hysop.backend.device.opencl import cl ifields = {f.dfield for f in ivfields} ofields = {f.dfield for f in ovfields} check_instance(ifields, set, values=DiscreteScalarField) check_instance(ofields, set, values=DiscreteScalarField) fields = ifields.union(ofields) for field in fields: flags = 0 if field in ifields: flags |= cl.map_flags.READ if field in ofields: flags |= cl.map_flags.WRITE assert field._data is not None self.register_mappable_object( key=field, obj=field._data.handle, flags=flags ) for vfield in vfields: self.register_data_getter( get_key=vfield, obj_key=vfield.dfield, getter=vfield._compute_data_view, )
[docs] def register_mappable_object(self, key, obj, flags): from hysop.backend.device.opencl import clArray msg = 'Device memory object "{}" has already been registered.' msg = msg.format(key) assert key not in self.__registered_objects, msg check_instance(obj, clArray.Array) self.__registered_objects[key] = (obj, flags)
[docs] def register_data_getter(self, get_key, obj_key, getter): assert callable(getter) msg = 'Device memory getter "{}" has already been registered as an object.' msg = msg.format(get_key) assert get_key not in self.__registered_objects, msg msg = 'Device memory getter "{}" has already been registered as a getter.' msg = msg.format(get_key) assert get_key not in self.__registered_getters, msg msg = 'Device memory object "{}" has not been registered.' msg = msg.format(obj_key) assert obj_key in self.__registered_objects, msg self.__registered_getters[get_key] = (obj_key, getter)
[docs] def map_objects(self, queue, is_blocking): DEBUG = False msg = "Device memory objects have already been mapped to host." assert not self.__mapped, msg evt = None for obj_key, (dev_buf, flags) in self.__registered_objects.items(): if DEBUG: msg = f"Mapping {obj_key.full_tag}..." print(msg) if is_blocking: host_buf = dev_buf.map_to_host( queue=queue, is_blocking=is_blocking, flags=flags ) else: host_buf, evt = dev_buf.map_to_host( queue=queue, is_blocking=is_blocking, flags=flags ) self.__mapped_objects[obj_key] = host_buf for get_key, (obj_key, getter) in self.__registered_getters.items(): if DEBUG: msg = f"Applying getter {get_key.full_tag} to mapped buffer {obj_key.full_tag}..." print(msg) self.__mapped_objects[get_key] = getter(self.__mapped_objects[obj_key]) self.__mapped = True return evt
[docs] def unmap_objects(self, force=False): msg = "Device memory objects have already been unmapped from host." assert force or self.__mapped, msg self.__mapped_objects.clear() self.__mapped = False
[docs] def get_mapped_object(self, key): msg = "Device memory objects have not been mapped to host yet." assert self.__mapped, msg msg = 'Device memory object "{}" has not been mapped.' msg = msg.format(key) assert key in self.__mapped_objects, msg return self.__mapped_objects[key]
[docs] def build_object_getter(self, key): import functools msg = 'Device memory object "{}" has not been registered.' msg = msg.format(key) assert key in self.__registered_objects, msg return functools.partial(self.get_mapped_object, key=key)
[docs] @contextmanager def map_objects_to_host(self, queue=None, is_blocking=True): if self.__registered_objects: assert self.cl_env is not None queue = first_not_None(queue, self.cl_env.default_queue) try: evt = self.map_objects(queue, is_blocking) yield OpenClMappedMemoryObjectGetter(self, evt) except: raise finally: self.unmap_objects() else: try: yield except: raise